feat: add array_exists with lambda support via CometUDF framework#4223
feat: add array_exists with lambda support via CometUDF framework#4223andygrove wants to merge 6 commits into
Conversation
|
@hsiang-c fyi |
Adds a new JVM UDF bridge framework that allows Spark expressions to be evaluated on the JVM side via Arrow C Data Interface, while keeping the native execution pipeline intact. Includes array_exists as the first lambda-based expression using this framework.
a57dd14 to
f1ece6c
Compare
|
Are we planning to merge it asap or wait DF 54.0? |
|
we can try use apache/datafusion#21903 directly or create |
I would love to get the JVM UDF framework in (once reviewed). There are many applications where it can help us get acceleration by default rather than opt-in
What would be the advantage of waiting for DF 54? Does that give us 100% compatibility for array_exists with lambdas? |
I could split the JVM UDF work out into a separate PR but there would be no tests if we don't have an example of an expression using it |
Having no tests for lambda is fine IMO as we do not expose the feature to users right away.
Thats the entire intention of My main concern we could end up with multiple lambda implementation in DF and in Comet and might cause confusion and conflicts. The small poc PR shown the For customers we can build another branch on top of DF54 migration branch and including lambda functions there, so they can test it, WDYT? |
Ok, here is new PR with just the framework - #4232 Moving this PR to draft |
| "ArrayExistsUDF requires a non-null scalar registry key") | ||
|
|
||
| val registryKey = new String(keyVec.get(0), StandardCharsets.UTF_8) | ||
| val arrayExistsExpr = CometLambdaRegistry.get(registryKey).asInstanceOf[ArrayExists] |
There was a problem hiding this comment.
CometLambdaRegistry.register is called on the driver side, while CometLambdaRegistry.get is called on the executor side. Can the expression be obtained correctly?
# Conflicts: # native/Cargo.lock # native/core/src/execution/planner.rs # native/jni-bridge/src/comet_udf_bridge.rs # native/spark-expr/src/jvm_udf/mod.rs
Followup to the apache/main merge: the framework's evaluate signature gained numRows in PR apache#4306, but the ArrayExistsUDF override was missed.
… paths Adapted from PR apache#3611: - DataFrame API, decimal, date, and timestamp element types - Literal-only lambda bodies (true / false / null) - CaseWhen / If in lambda body - Fallback for binary element type - Fallback for lambdas capturing outer columns - Fallback for nested lambda
…istry The previous CometLambdaRegistry approach registered the ArrayExists expression in a JVM-local ConcurrentHashMap on the driver and looked it up by UUID on the executor. In a real cluster the executor JVM never sees the driver's registry and the lookup fails. Local-mode tests masked this because driver and executor share the same JVM. Serialize the ArrayExists Catalyst expression to bytes in the serde layer and embed it in the proto as a BinaryType literal arg. The UDF deserializes on each invocation. CometLambdaRegistry is no longer referenced and is removed.
Which issue does this PR close?
Part of #4193
Rationale for this change
The CometUDF framework merged in #4232 and #4306 lets Comet dispatch scalar UDFs to JVM-side implementations that operate directly on Arrow data, avoiding the cost of falling back to Spark. This PR is the first consumer:
array_exists(Spark'sexists(array, x -> predicate(x))) ships as a JVM UDF so we can support a lambda-based expression with 100% Spark compatibility without porting the lambda evaluator to native code.Performance is 1.8x of Spark on a 7M-row int-array workload.
What changes are included in this PR?
Experimental support for Spark's
exists(array, x -> predicate(x)): the first lambda-based expression accelerated by Comet, built on the CometUDF framework.ArrayExistsUDF: iterates ListVector elements, evaluates the lambda predicate via Spark'sNamedLambdaVariable, implements three-valued null logicCometArrayExistsserde: registers the lambda inCometLambdaRegistryand emits aJvmScalarUdfproto invokingArrayExistsUDFHow are these changes tested?
CometArrayExpressionSuitecovers compatibility and fallback paths.Compatibility (
checkSparkAnswerAndOperator):true,false,null)CASE WHENandIFinside the lambda bodyFallback (
checkSparkAnswerAndFallbackReason):